Skip to content

Conversation

@torosent
Copy link
Member

@torosent torosent commented Jan 27, 2026

Summary

What changed?

  • Added inline channel caching in ConfigureGrpcChannel class using ConcurrentDictionary<string, Lazy<GrpcChannel>>
  • Made ConfigureGrpcChannel implement IDisposable to properly dispose channels when the ServiceProvider is disposed
  • Channels are cached per configuration key (options name + endpoint + task hub)

Why is this change needed?

  • When the Durable Task Scheduler service is unreachable, the IConfigureNamedOptions<DurableTaskSchedulerOptions>.Configure() method was being called multiple times (on each retry)
  • Each call created a new GrpcChannel, which allocates internal HTTP handlers and socket connections
  • The channels were never disposed because when options.Channel is pre-set (AzureManaged case), GetCallInvoker() returns default for the disposable reference
  • This caused handle/resource leaks that accumulated over time, especially when the service was unavailable

Issues / work items

  • Related #N/A (fixes handle leak issue)

Project checklist

  • Release notes are not required for the next release
    • Otherwise: Notes added to release_notes.md
  • Backport is not required
    • Otherwise: Backport tracked by issue/PR #issue_or_pr
  • All required tests have been added/updated (unit tests, E2E tests)
  • Breaking change?
    • If yes: No breaking changes - internal implementation change only, public API unchanged

AI-assisted code disclosure (required)

Was an AI tool used? (select one)

  • No
  • Yes, AI helped write parts of this PR (e.g., GitHub Copilot)
  • Yes, an AI agent generated most of this PR

If AI was used:

  • Tool(s): GitHub Copilot (Claude)
  • AI-assisted areas/files:
    • src/Client/AzureManaged/DurableTaskSchedulerClientExtensions.cs (modified)
    • src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs (modified)
    • test/Client/AzureManaged.Tests/DurableTaskSchedulerClientExtensionsTests.cs (modified)
    • test/Worker/AzureManaged.Tests/DurableTaskSchedulerWorkerExtensionsTests.cs (modified)
  • What you changed after AI output: Simplified from separate GrpcChannelCache class to inline ConcurrentDictionary<string, Lazy<GrpcChannel>> per reviewer feedback

AI verification (required if AI was used):

  • I understand the code and can explain it
  • I verified referenced APIs/types exist and are correct
  • I reviewed edge cases/failure paths (timeouts, retries, cancellation, exceptions)
  • I reviewed concurrency/async behavior
  • I checked for unintended breaking or behavior changes

Testing

Automated tests

  • Result: Passed
    • test/Client/AzureManaged.Tests - 33 tests passed
    • test/Worker/AzureManaged.Tests - 31 tests passed
    • test/Shared/AzureManaged.Tests - 20 tests passed
    • Total: 84 tests passed

Manual validation (only if runtime/behavior changed)

  • Not performed (behavior change is internal - channels are now cached and disposed properly)

Notes for reviewers

  • The ConfigureGrpcChannel class now uses ConcurrentDictionary<string, Lazy<GrpcChannel>> for thread-safe channel caching
  • Lazy<GrpcChannel> ensures thread-safe initialization without holding locks during channel creation (avoids potential deadlocks)
  • Channel keys include options name, endpoint, and task hub to ensure proper isolation
  • Fire-and-forget ShutdownAsync() is called before Dispose() for graceful shutdown of in-flight RPCs
  • The volatile keyword on the disposed field ensures proper memory visibility when checking disposal state from multiple threads

- Add GrpcChannelCache for thread-safe channel caching by endpoint
- Update Client/Worker extensions to use shared cache
- Ensure channels are disposed when ServiceProvider disposes
- Add comprehensive unit and integration tests
@cgillum
Copy link
Member

cgillum commented Jan 27, 2026

I asked Copilot CLI to review this with the following prompt since the currently proposed solution looks far more complicated than I was expecting.

Please review this PR. I'm especially interested in knowing if a significantly simpler solution is possible

Here's what the response was:

   PR Review Summary

   Overall Assessment: The fix correctly addresses the GrpcChannel leak, but there's a significantly simpler solution available.

   The Core Problem

   When options.Channel is pre-set (AzureManaged case), GetCallInvoker() returns default for the disposable reference (line 549 in
   GrpcDurableTaskClient.cs), meaning the channel is never disposed. Combined with IConfigureNamedOptions.Configure() being called on each
   retry, this creates a resource leak.

   Simpler Alternative

   Instead of introducing a full GrpcChannelCache class with locking, disposal tracking, and 400+ lines of test code, consider this much
   simpler fix:

   Option 1: Make the channel a Lazy<GrpcChannel> in the options class

     // In ConfigureGrpcChannel class
     class ConfigureGrpcChannel(IOptionsMonitor<DurableTaskSchedulerClientOptions> schedulerOptions) :
         IConfigureNamedOptions<GrpcDurableTaskClientOptions>
     {
         readonly ConcurrentDictionary<string, Lazy<GrpcChannel>> channels = new();

         public void Configure(string? name, GrpcDurableTaskClientOptions options)
         {
             string key = name ?? Options.DefaultName;
             DurableTaskSchedulerClientOptions source = schedulerOptions.Get(key);

             options.Channel = this.channels.GetOrAdd(
                 $"{key}:{source.EndpointAddress}:{source.TaskHubName}",
                 _ => new Lazy<GrpcChannel>(source.CreateChannel)).Value;
         }
     }

   This is ~10 lines vs 170+ lines for GrpcChannelCache, and the IConfigureOptions<T> is already registered as a singleton, so the channel
   cache lives in that singleton's state.

   Option 2: Cache directly on the scheduler options instance

   Since DurableTaskSchedulerClientOptions instances are already managed via the options system, add a lazily-created channel property there:

     // In DurableTaskSchedulerClientOptions
     Lazy<GrpcChannel>? cachedChannel;

     internal GrpcChannel GetOrCreateChannel()
     {
         this.cachedChannel ??= new Lazy<GrpcChannel>(this.CreateChannel);
         return this.cachedChannel.Value;
     }

   Concerns with Current Implementation

     - Unused methods: Replace() and TryRemove() in GrpcChannelCache are never called - they add complexity without value.
     - Fire-and-forget disposal: The DisposeChannelAsync pattern with _ = Task.Run(...) can orphan disposal if the app terminates quickly.
   Consider whether synchronous disposal is acceptable.
     - Over-engineering: The cache handles scenarios (explicit replacement, removal) that don't apply to this use case.
     - Code quality issues already flagged: Nested if statements, unused variable, generic catch clause.

   Recommendation

   Go with the simpler ConcurrentDictionary<string, Lazy<GrpcChannel>> approach directly in the ConfigureGrpcChannel class. This:

     - Fixes the leak
     - Requires no new classes
     - Has minimal test surface area
     - Is easier to understand and maintain

   If you need disposal support, implement IDisposable on ConfigureGrpcChannel and register it properly with DI (though the existing code
   doesn't dispose channels either, so this may be acceptable).

I can try to review this more deeply tomorrow.

- Move channel factory call outside lock to prevent deadlock
- Combine nested if statements in Replace method
- Use 'using' statement for channel disposal
- Catch Exception instead of bare catch
- Remove unused variable in test
- Remove separate GrpcChannelCache class
- Inline channel caching directly in ConfigureGrpcChannel using ConcurrentDictionary<string, Lazy<GrpcChannel>>
- Make ConfigureGrpcChannel implement IDisposable for proper channel disposal
- Remove unused Replace() and TryRemove() methods
- Add disposal verification tests
- Reduces complexity from 170+ LOC to ~40 LOC per extension
- Use LINQ Where() instead of if inside foreach for filtering channels
- Narrow catch (Exception) to specific types (OperationCanceledException, ObjectDisposedException)
options.Channel.Should().NotBeNull();

// Dispose the service provider - this should dispose the ConfigureGrpcChannel which disposes channels
provider.Dispose();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see we're disposing the provider but there's no code that actually checks to see if the channel was disposed. I don't think we can rely on this test to know whether the disposal code is working as intended.

{
// ShutdownAsync is the graceful way to close a gRPC channel.
// Fire-and-forget but ensure the channel is eventually disposed.
_ = Task.Run(async () =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we doing fire-and-forget here? This seems dangerous from a reliability perspective and makes it much harder to author reliable tests.

/// <param name="schedulerOptions">Monitor for accessing the current scheduler options configuration.</param>
class ConfigureGrpcChannel(IOptionsMonitor<DurableTaskSchedulerWorkerOptions> schedulerOptions) :
IConfigureNamedOptions<GrpcDurableTaskWorkerOptions>
sealed class ConfigureGrpcChannel : IConfigureNamedOptions<GrpcDurableTaskWorkerOptions>, IDisposable
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we able to use IAsyncDisposable instead of IDisposable? The gRPC channel uses an async method to shutdown so it would be better if our dispose code could call it in the proper async way.

@torosent torosent marked this pull request as ready for review January 27, 2026 22:49
Copilot AI review requested due to automatic review settings January 27, 2026 22:49
Comment on lines +171 to +175
catch (Exception ex)
{
exceptions ??= new List<Exception>();
exceptions.Add(ex);
}
Comment on lines +174 to +178
catch (Exception ex)
{
exceptions ??= new List<Exception>();
exceptions.Add(ex);
}
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Fixes a gRPC channel/handle leak in the AzureManaged Durable Task Scheduler integration by caching GrpcChannel instances and disposing them when the DI container is disposed.

Changes:

  • Cache GrpcChannel instances inside ConfigureGrpcChannel using ConcurrentDictionary<..., Lazy<GrpcChannel>>.
  • Make the channel configurators disposable so cached channels are cleaned up on ServiceProvider disposal.
  • Add tests intended to validate channel reuse and disposal behavior for both client and worker extensions.

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 12 comments.

File Description
src/Client/AzureManaged/DurableTaskSchedulerClientExtensions.cs Adds per-configuration GrpcChannel caching and async disposal logic for client-side options configuration.
src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs Adds per-configuration GrpcChannel caching and async disposal logic for worker-side options configuration.
test/Client/AzureManaged.Tests/DurableTaskSchedulerClientExtensionsTests.cs Adds tests for channel reuse/isolation and disposal on DI container teardown for client extensions.
test/Worker/AzureManaged.Tests/DurableTaskSchedulerWorkerExtensionsTests.cs Adds tests for channel reuse/isolation and disposal on DI container teardown for worker extensions.

Comment on lines +109 to +113
sealed class ConfigureGrpcChannel : IConfigureNamedOptions<GrpcDurableTaskWorkerOptions>, IAsyncDisposable
{
readonly IOptionsMonitor<DurableTaskSchedulerWorkerOptions> schedulerOptions;
readonly ConcurrentDictionary<string, Lazy<GrpcChannel>> channels = new();
int disposed;
Copy link

Copilot AI Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR description mentions IDisposable and a volatile disposed flag, but this implementation is IAsyncDisposable-only and the disposed field isn’t volatile (and is read without Volatile.Read). Either update the PR description or adjust the implementation to match (e.g., implement IDisposable and use volatile/Volatile.Read for the disposed check).

Copilot uses AI. Check for mistakes.
Comment on lines +107 to +111
sealed class ConfigureGrpcChannel : IConfigureNamedOptions<GrpcDurableTaskClientOptions>, IAsyncDisposable
{
readonly IOptionsMonitor<DurableTaskSchedulerClientOptions> schedulerOptions;
readonly ConcurrentDictionary<string, Lazy<GrpcChannel>> channels = new();
int disposed;
Copy link

Copilot AI Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR description mentions IDisposable and a volatile disposed flag, but this implementation is IAsyncDisposable-only and the disposed field isn’t volatile (and is read without Volatile.Read). Either update the PR description or adjust the implementation to match (e.g., implement IDisposable and use volatile/Volatile.Read for the disposed check).

Copilot uses AI. Check for mistakes.
Comment on lines +296 to +301
ServiceProvider provider = services.BuildServiceProvider();

// Resolve options multiple times to trigger channel configuration
IOptionsMonitor<GrpcDurableTaskClientOptions> optionsMonitor = provider.GetRequiredService<IOptionsMonitor<GrpcDurableTaskClientOptions>>();
GrpcDurableTaskClientOptions options1 = optionsMonitor.Get(Options.DefaultName);
GrpcDurableTaskClientOptions options2 = optionsMonitor.Get(Options.DefaultName);
Copy link

Copilot AI Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test calls IOptionsMonitor.Get() twice; OptionsMonitor caches options per name, so the second call typically returns the same options instance and won’t re-run Configure (so it doesn’t validate the new channel-caching behavior). Consider forcing new options creation (e.g., via IOptionsFactory/IOptionsSnapshot scopes) and disposing the ServiceProvider to avoid leaking channels/handlers during the test run.

Suggested change
ServiceProvider provider = services.BuildServiceProvider();
// Resolve options multiple times to trigger channel configuration
IOptionsMonitor<GrpcDurableTaskClientOptions> optionsMonitor = provider.GetRequiredService<IOptionsMonitor<GrpcDurableTaskClientOptions>>();
GrpcDurableTaskClientOptions options1 = optionsMonitor.Get(Options.DefaultName);
GrpcDurableTaskClientOptions options2 = optionsMonitor.Get(Options.DefaultName);
using ServiceProvider provider = services.BuildServiceProvider();
// Resolve options multiple times to trigger channel configuration
IOptionsFactory<GrpcDurableTaskClientOptions> optionsFactory = provider.GetRequiredService<IOptionsFactory<GrpcDurableTaskClientOptions>>();
GrpcDurableTaskClientOptions options1 = optionsFactory.Create(Options.DefaultName);
GrpcDurableTaskClientOptions options2 = optionsFactory.Create(Options.DefaultName);

Copilot uses AI. Check for mistakes.
Comment on lines +214 to +219
ServiceProvider provider = services.BuildServiceProvider();

// Resolve options multiple times to trigger channel configuration
IOptionsMonitor<GrpcDurableTaskWorkerOptions> optionsMonitor = provider.GetRequiredService<IOptionsMonitor<GrpcDurableTaskWorkerOptions>>();
GrpcDurableTaskWorkerOptions options1 = optionsMonitor.Get(Options.DefaultName);
GrpcDurableTaskWorkerOptions options2 = optionsMonitor.Get(Options.DefaultName);
Copy link

Copilot AI Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test calls IOptionsMonitor.Get() twice; OptionsMonitor caches options per name, so the second call typically returns the same options instance and won’t re-run Configure (so it doesn’t validate the new channel-caching behavior). Consider forcing new options creation (e.g., via IOptionsFactory/IOptionsSnapshot scopes) and disposing the ServiceProvider to avoid leaking channels/handlers during the test run.

Suggested change
ServiceProvider provider = services.BuildServiceProvider();
// Resolve options multiple times to trigger channel configuration
IOptionsMonitor<GrpcDurableTaskWorkerOptions> optionsMonitor = provider.GetRequiredService<IOptionsMonitor<GrpcDurableTaskWorkerOptions>>();
GrpcDurableTaskWorkerOptions options1 = optionsMonitor.Get(Options.DefaultName);
GrpcDurableTaskWorkerOptions options2 = optionsMonitor.Get(Options.DefaultName);
using ServiceProvider provider = services.BuildServiceProvider();
// Resolve options multiple times to trigger channel configuration via new options instances
IOptionsFactory<GrpcDurableTaskWorkerOptions> optionsFactory = provider.GetRequiredService<IOptionsFactory<GrpcDurableTaskWorkerOptions>>();
GrpcDurableTaskWorkerOptions options1 = optionsFactory.Create(Options.DefaultName);
GrpcDurableTaskWorkerOptions options2 = optionsFactory.Create(Options.DefaultName);

Copilot uses AI. Check for mistakes.
Comment on lines +240 to +243
// Act - configure two different named workers with different endpoints
mockBuilder1.Object.UseDurableTaskScheduler("endpoint1.westus3.durabletask.io", ValidTaskHub, credential);
mockBuilder2.Object.UseDurableTaskScheduler("endpoint2.westus3.durabletask.io", ValidTaskHub, credential);
ServiceProvider provider = services.BuildServiceProvider();
Copy link

Copilot AI Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test uses different endpoints for different named options, so it will pass even if the cache key accidentally ignores the options name. To validate name isolation in the cache key, use the same endpoint/task hub for both names and assert the channels differ; also dispose the ServiceProvider to avoid leaking channels.

Suggested change
// Act - configure two different named workers with different endpoints
mockBuilder1.Object.UseDurableTaskScheduler("endpoint1.westus3.durabletask.io", ValidTaskHub, credential);
mockBuilder2.Object.UseDurableTaskScheduler("endpoint2.westus3.durabletask.io", ValidTaskHub, credential);
ServiceProvider provider = services.BuildServiceProvider();
// Act - configure two different named workers with the same endpoint and task hub
mockBuilder1.Object.UseDurableTaskScheduler("endpoint.westus3.durabletask.io", ValidTaskHub, credential);
mockBuilder2.Object.UseDurableTaskScheduler("endpoint.westus3.durabletask.io", ValidTaskHub, credential);
using ServiceProvider provider = services.BuildServiceProvider();

Copilot uses AI. Check for mistakes.
Comment on lines +181 to +184
if (exceptions is { Count: > 0 })
{
throw new AggregateException(exceptions);
}
Copy link

Copilot AI Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DisposeAsync throws an AggregateException when any channel shutdown/dispose fails. Throwing from ServiceProvider disposal can surface as app shutdown failures and is difficult for callers to handle. Consider making this best-effort (swallow/log disposal errors) instead of throwing.

Copilot uses AI. Check for mistakes.
Comment on lines +322 to +326
// Act - configure two different named clients with different endpoints
mockBuilder1.Object.UseDurableTaskScheduler("endpoint1.westus3.durabletask.io", ValidTaskHub, credential);
mockBuilder2.Object.UseDurableTaskScheduler("endpoint2.westus3.durabletask.io", ValidTaskHub, credential);
ServiceProvider provider = services.BuildServiceProvider();

Copy link

Copilot AI Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test uses different endpoints for different named options, so it will pass even if the cache key accidentally ignores the options name. To validate name isolation in the cache key, use the same endpoint/task hub for both names and assert the channels differ; also dispose the ServiceProvider to avoid leaking channels.

Copilot uses AI. Check for mistakes.
// Create a cache key based on the options name, endpoint, and task hub.
// This ensures channels are reused for the same configuration
// but separate channels are created for different configurations.
string cacheKey = $"{optionsName}:{source.EndpointAddress}:{source.TaskHubName}";
Copy link

Copilot AI Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The channel cache key is built by concatenating strings with ':' delimiters, but EndpointAddress commonly contains ':' (e.g., "https://" or ports). This can create ambiguous keys and potential collisions. Prefer a composite key type (e.g., ValueTuple/record struct) rather than a delimited string.

Suggested change
string cacheKey = $"{optionsName}:{source.EndpointAddress}:{source.TaskHubName}";
// Use a delimiter character (\u001F) that will not appear in typical endpoint URIs.
string cacheKey = $"{optionsName}\u001F{source.EndpointAddress}\u001F{source.TaskHubName}";

Copilot uses AI. Check for mistakes.
Comment on lines +146 to +150
string optionsName = name ?? Options.DefaultName;
DurableTaskSchedulerWorkerOptions source = this.schedulerOptions.Get(optionsName);

// Create a cache key based on the options name, endpoint, and task hub.
// This ensures channels are reused for the same configuration
Copy link

Copilot AI Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CreateChannel() behavior depends on more than endpoint/task hub (e.g., ResourceId, Credential, AllowInsecureCredentials, and WorkerId via the call credentials interceptor). If any of these values change while EndpointAddress/TaskHubName stay the same (e.g., via options reload), the cached channel will be reused with stale settings. Consider including these fields in the cache key or enforcing immutability for them.

Copilot uses AI. Check for mistakes.
Comment on lines +184 to +187
if (exceptions is { Count: > 0 })
{
throw new AggregateException(exceptions);
}
Copy link

Copilot AI Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DisposeAsync throws an AggregateException when any channel shutdown/dispose fails. Throwing from ServiceProvider disposal can surface as app shutdown failures and is difficult for callers to handle. Consider making this best-effort (swallow/log disposal errors) instead of throwing.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants